package com.ylh.untils;

import com.ylh.bean.PtVisitCnt;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class SinkClickHouse extends RichSinkFunction<PtVisitCnt> {
    Connection connection=null;
    PreparedStatement ps=null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
         connection = DriverManager.getConnection("jdbc:clickhouse://hadoop-single:8123/default?characterEncoding=utf8", "default", "");
         ps = connection.prepareStatement("insert into pt_list values (?,?,?)");
    }

    @Override
    public void close() throws Exception {
        super.close();
        ps.close();
        connection.close();
    }

    @Override
    public void invoke(PtVisitCnt value, Context context) throws Exception {
        super.invoke(value, context);
        ps.setString(1, value.getPage_id());
        ps.setString(2, value.getPt());
        ps.setInt(3, value.getCnt());
        ps.executeUpdate();
    }
}
